篇首语:本文由编程笔记#小编为大家整理,主要介绍了重温Linux内核:互斥和同步相关的知识,希望对你有一定的参考价值。
最近迁移到Amlogic平台,需要重温部分内核开发的知识,所以就做下读书笔记,对于互斥和同伴,大体上,这一篇应该差不多够了,后续有空再补充补充。
对于临界资源,常用的就是自旋锁和互斥锁,区别是自旋锁不可打断(睡眠),而互斥锁可以。信号量一般用于实现内核中的同步机制。
当内核有多条执行路径同时访问同一个共享资源时,就会造成竞态。
常见的共享资源有全局变量、静态变量、硬件的寄存器和共同使用的动态分配的同一段内存等。
造成竞态的根本原因就是内核中的代码对共享资源产生了并发(同时)的访问。
(1)硬件中断——当处理器允许中断的时候,一个内核执行路径可能在任何一个时间都会被一个外部中断打断。
(2)软中断和tasklet——通过前面的知识我们知道,内核可以在任意硬中断快要返回之前执行软中断及tasklet,也有可能唤醒软中断线程,并执行tasklet。
(3)抢占内核的多进程环境——如果一个进程在执行时发生系统调用,进入到内核,由内核代替该进程完成相应的操作,此时如有一个更高优先级的进程准备就绪,内核判断在可抢占的条件成立的情况下可以抢占当前进程,然后去执行更高优先级的进程。
(4)普通的多进程环境——当一个进程因为等待的资源暂时不可用时,就会主动放弃CPU,内核会调度另外一个进程来执行。
(5)多处理器或多核 CPU。在同一时刻,可以在多个处理器上并发执行多个程序,这是真正意义上的并发。
并发对共享资源访问就会引起竞态,解决竞态的一个方法就是互斥,也就是对共享资源的串行化访问,即在一条内核执行路径上访问共享资源时,不允许其他内核执行路径来访问共享资源。共享资源有时候又叫作临界资源,而访问共享资源的这段代码又叫作临界代码段或临界区。
造成竞态的原因是一条内核执行路径被中断打断了。如果在访问共享资源之前先将中断屏蔽(禁止),然后再访问共享资源,等共享资源访问完成后再重新使能中断就能避免这种竞态的产生。
需要另外说明的是,如果明确知道是哪一个中断会带来竞态,我们通常应该只屏蔽相应的中断,而不是屏蔽本地CPU的全局中断,这样可以使其他中断照常执行。如果非要屏蔽本地 CPU 的中断,那么应该尽量使用local_irq_save 和 local_irq_restore 这对宏,因为如果使用 local_irq_disable 和 local_irq_enable 这对宏,如果中断在屏蔽之前本身就是屏蔽的,那么 local_irq_enable 会将本来就屏蔽的中断错误地使能,从而造成中断使能状态的前后不一致。而且,中断屏蔽到中断重新使能之间的这段代码不宜过长,否则中断屏蔽的时间过长,将会影响系统的性能。
i++的例子中,可以在i++之前屏蔽中断,之后重新使能中断,代码的形式如下。
unsigned long flags;
//假设i是一个全局变量,即共享资源(临界资源)
local_irq_save(flags);
i++;
local_irq_restore(flags);
使用中断屏蔽来做互斥时的注意事项总结如下。
(1)对解决中断引起的并发而带来的竞态简单高效。
(2)应该尽量使用local_irq_save和local_irq_restore来屏蔽和使能中断。
(3)中断屏蔽的时间不宜过长。
(4)只能屏蔽本地CPU的中断,对多CPU系统,中断也可能会在其他CPU上产生。
如果一个变量的操作是原子性的,即不能再被分割,类似于在汇编级代码上也只要一条汇编指令就能完成,那么对这样变量的访问就根本不需要考虑并发带来的影响。因此,内核专门提供了一种数据类型atomic_t、atomic64_t,用它来定义的变量为原子变量,其类型定义如下。
/* include/linux/types.h */
typedef struct
int counter;
atomic_t;
#ifdef CONFIG_64BIT
typedef struct
long counter;
atomic64_t;
#endif
原子变量其实是一个整型变量。对于整型变量,有的处理器专门提供一些指令来实现原子操作(比如ARM处理器中的swp指令),内核就会使用这些指令来对原子变量进行访问
内核将会使用其他的手段来保证对它访问的原子性,比如中断屏蔽。
int atomic_add_return(int i,atomic_t *v);
int atomic_sub_return(int i,atomic_t *v);
int atomic_add_negative(int i,atomic_t *v)
void atomic_add(int i,atomic_t *v);
void atomic_sub(int i,atomic_t *v);
void atomic_inc(atomic_t *v);
void atomic_dec(atomic_t *v);
atomic_dec_return(v)
atomic_inc_return(v)
atomic_sub_and_test(i,v)
atomic_dec_and_test(v)
atomic_inc_and_test(v)
atomic_xchg(ptr,v)
atomic_cmpxchg(v,old,new)
void atomic_clear_mask(unsigned long mask,atomic_t *v);
void atomic_set_mask(unsigned int mask,atomic_t *v);
void set_bit(int nr,volatile unsigned long *addr);
void clear_bit(int nr,volatile unsigned long *addr);
void change_bit(int nr,volatile unsigned long *addr);
int test_and_set_bit(int nr,volatile unsigned long *addr);
int test_and_clear_bit(int nr,volatile unsigned long *addr);
int test_and_change_bit(int nr,volatile unsigned long *addr);
atomic_read:读取原子变量v的值。
atomic_set(v,i):设置原子变量v的值为i。
atomic_add、atomic_sub:将原子变量加上i或减去i,加“_return”表示还要返回修改后的值,加“_negative”表示当结果为负返回真。
…… 略 ……
是内核中的另一种互斥手段,在访问共享资源之前,首先要获得自旋锁,访问完共享资源后解锁。其他内核执行路径(其他人)如果没有竞争到锁,只能忙等待,所以自旋锁是一种忙等锁。所谓“忙等”,就是要持续占用CPU,一直循环,知道获得锁为止。
内核中自旋锁的类型是spinlock_t,相关的API如下。
spin_lock_init(_lock)
void spin_lock(spinlock_t *lock);
void spin_lock_irq(spinlock_t *lock);
spin_lock_irqsave(lock,flags)
void spin_lock_bh(spinlock_t *lock);
int spin_trylock(spinlock_t *lock);
int spin_trylock_bh(spinlock_t *lock);
int spin_trylock_irq(spinlock_t *lock);
void spin_unlock(spinlock_t *lock);
void spin_unlock_irq(spinlock_t *lock);
void spin_unlock_irqrestore(spinlock_t *lock,unsigned long flags);
void spin_unlock_bh(spinlock_t *lock);
spin_lock_init:初始化自旋锁,在使用自旋锁之前必须要初始化。
spin_lock:获取自旋锁,如果不能获得自旋锁,则进行忙等待。
spin_lock_bh:获取自旋锁并禁止下半部。
spin_lock_irq:获取自旋锁并禁止中断。
spin_lock_irqsave:获取自旋锁并禁止中断,保存中断屏蔽状态到flags中。
spin_trylock:尝试获取自旋锁,即使不能获取,也立即返回,返回值为0表示成功获得自旋锁,否则表示没有获得自旋锁。其他的变体和spin_lock变体的意义相同。
spin_unlock:释放自旋锁,其他的unlock版本可以依据前面的解释判断其作用。
在i++的例子中,我们可以使用下面的代码来使自旋锁对i的操作进行互斥。
int i = 5;
/* 定义自旋锁 */
spinlock_t lock;
/* 用于保存中断屏蔽状态的变量 */
unsigned long flags;
/* 使用自旋锁之前必须初始化自旋锁 */
spin_lock_init(&lock);
/* 访问共享资源之前获得自旋锁,禁止中断,并将之前的中断屏蔽状态保存在flags变量中 */
spin_lock_irqsave(&lock,flags);
/* 访问共享资源 */
i++;
/* 共享资源访问完成后释放自旋锁,用flags的值恢复中断屏蔽的状态 */
spin_unlock_irqrestore(&lock,flags);
从上面的例子可以看到,自旋锁的使用还是比较直观的,基本的步骤是定义锁、初始化锁、访问共享资源之前获得锁、访问完成之后释放锁。
关于自旋锁的一些重要特性和使用注意事项总结如下。
(1)获得自旋锁的临界代码段执行时间不宜过长,因为是忙等锁,如果临界代码段执行时间过长,就意味着其他想要获得锁的内核执行路径会进行长时间的忙等待,这会影响系统的工作效率。
(2)在获得锁的期间,不能够调用可能会引起进程切换的函数,因为这会增加持锁的时间,导致其他要获取锁的代码进行更长时间的等待,更糟糕的情况是,如果新调度的进程也要获取同样的自旋锁,那么会导致死锁。
(3)自旋锁是不可递归的,即获得锁之后不能再获得自旋锁,否则会因为等待一个不能获得的锁而将自己锁死。
(4)自旋锁可以用于中断上下文中,因为它是忙等锁,所以并不会引起进程的切换。
(5)如果中断中也要访问共享资源,则在非中断处理代码中访问共享资源之前应该先禁止中断再获取自旋锁,即应该使用spin_lock_irq或spin_lock_irqsave来获得自旋锁。如果不这样的话,即使获得了锁中断也会发生,在中断中访问共享资源之前,中断也要获得一个已经被获得的自旋锁,那么中断将会被锁死,中断的下半部也有类似的情况。另外,推荐使用spin_lock_irqsave而不是spin_lock_irq,原因同中断屏蔽中相关的描述。
(6)虽然一直都在说自旋锁是忙等锁,但是在单处理器的无抢占内核中,单纯的自旋锁(指不是禁止中断,禁止下半部的一些变体)获取操作其实是一个空操作,而在单处理器的可抢占内核中也仅仅是禁止抢占而已(但这会使高优先级的就绪进程的执行时间稍微推后一些)。真正的忙等待的特性只有在多处理器中才会体现出来,不过作为驱动开发者,我们不应该来假设驱动的运行环境,或者说都应该假设成运行在多处理器的可抢占系统上。
在并发的方式中有读—读并发、读—写并发和写—写并发三种,很显然,一般的资源的读操作并不会修改它的值(对某些读清零的硬件寄存器除外),因此读和读之间是完全允许并发的。但是使用自旋锁,读操作也会被加锁,从而阻止了另外一个读操作。为了提高并发的效率,必须要降低锁的粒度,以允许读和读之间的并发。为此,内核提供了一种允许读和读并发的锁,叫读写锁。
rwlock_init(lock)
read_trylock(lock)
write_trylock(lock)
read_lock(lock)
write_lock(lock)
read_lock_irq(lock)
read_lock_irqsave(lock,flags)
read_lock_bh(lock)
write_lock_irq(lock)
write_lock_irqsave(lock,flags)
write_lock_bh(lock)
read_unlock(lock)
write_unlock(lock)
read_unlock_irq(lock)
read_unlock_irqrestore(lock,flags)
read_unlock_bh(lock)
write_unlock_irq(lock)
write_unlock_irqrestore(lock,flags)
write_unlock_bh(lock)
读写锁的使用也需经历定义、初始化、加锁和解锁的过程,只是要改变变量的值需先获取写锁,值改变完成后再解除写锁,读操作则用读锁。这样,当一个内核执行路径在获取变量的值时,如果有另一条执行路径也要来获取变量的值,则读锁可以正常获得,从而另一条路径也能获取变量的值。但如果有一个写在进行,那不管是写锁还是读锁都不能获得,只有当写锁释放了之后才可以。很明显,使用读写锁降低了锁的粒度,即对锁的控制更加精细了,从而获得了更高的并发性,带来了更高的工作效率。
int i = 5;
unsigned long flags;
rwlock_t lock;
/* 使用之前先初始化读写锁 */
rwlock_init(&lock);
/* 要改变变量的值之前获取写锁 */
write_lock_irqsave(&lock,flags);
i++;
write_unlock_irqrestore(&lock,flags);
int v;
/* 只是获取变量的值先获得读锁 */
read_lock_irqsave(&lock,flags);
v = i;
read_unlock_irqrestore(&lock,flags);
自旋锁不允许读和读之间的并发,读写锁则更进了一步,允许读和读之间的并发,顺序锁又更进了一步,允许读和写之间的并发。为了实现这一需求,顺序锁在读时不上锁,也就意味着在读的期间允许写,但是在读之前需要先读取一个顺序值,读操作完成后,再次读取顺序值,如果两者相等,说明在读的过程中没有发生过写操作,否则要重新读取。显然,写操作要上锁,并且要更新顺序值。顺序锁特别适合读很多而写比较少的场合,否则由于反复的读操作,也不一定能够获取较高的效率。顺序锁的数据类型是seqlock_t,其类型定义如下。
typedef struct
struct seqcount seqcount;
spinlock_t lock;
seqlock_t;
很显然,顺序锁使用了自旋锁的机制,并且有一个顺序值 seqcount。顺序锁的主要API如下。
seqlock_init(x)
unsigned read_seqbegin(const seqlock_t *sl);
unsigned read_seqretry(const seqlock_t *sl,unsigned start);
void write_seqlock(seqlock_t *sl);
void write_sequnlock(seqlock_t *sl);
void write_seqlock_bh(seqlock_t *sl);
void write_sequnlock_bh(seqlock_t *sl);
void write_seqlock_irq(seqlock_t *sl);
void write_sequnlock_irq(seqlock_t *sl);
write_seqlock_irqsave(lock,flags)
void write_sequnlock_irqrestore(seqlock_t *sl,unsigned long flags);
seqlock_init:初始化顺序锁。
read_seqbegin:读之前获取顺序值,函数返回顺序值。
read_seqretry:读之后验证顺序值是否发生了变化,返回1表示需要重读,返回0表示读成功。
write_seqlock:写之前加锁,其他的变体请参照自旋锁。
write_sequnlock:写之后解锁,其他的变体请参照自旋锁。
在i++的例子中,我们可以使用下面的代码来使顺序锁对i的操作进行互斥。
int i = 5;
unsigned long flags;
/* 定义顺序锁 */
seqlock_t lock;
/* 使用之前必须初始化顺序锁 */
seqlock_init(&lock);
int v;
unsigned start;
do
/* 读之前要先获取顺序值 */
start = read_seqbegin(&lock);
v = i;
/* 读完之后检查顺序值是否发生了变化,如果是,则要重读 */
while (read_seqretry(&lock,start));
/* 写之前获取顺序锁 */
write_seqlock_irqsave(&lock,flags);
i++;
/* 写完后释放顺序锁 */
write_sequnlock_irqrestore(&lock,flags);
前面所讨论的锁机制都有一个限制,那就是在锁获得期间不能调用调度器,即不能引起进程切换。但是内核中有很多函数都可能会触发对调度器的调用(在中断的章节列举过一些),这给驱动开发带来了一些麻烦。另外,我们也知道,对于忙等锁来说,当临界代码段执行的时间比较长的时候,会降低系统的效率。为此内核提供了一种叫信号量的机制来取消这一限制,它的数据类型定义如下。
struct semaphore
raw_spinlock_t lock;
unsigned int count;
struct list_head wait_list;
;
可以看到,它有一个count成员,这是用来记录信号量资源的情况的,当count的值不为0时是可以获得信号量的,当count的值为0时信号量就不能被获取,这也说明信号量可以同时被多个进程所持有。我们还看到了一个 wait_list 成员,不难猜想,当信号量不能获取时,当前的进程就应该休眠了。最后,lock 成员在提示我们,信号量在底层其实使用了自旋锁的机制。
信号量最常用的API接口如下:
void sema_init(struct semaphore *sem,int val);
void down(struct semaphore *sem);
int down_interruptible(struct semaphore *sem);
int down_trylock(struct semaphore *sem);
int down_timeout(struct semaphore *sem,long jiffies);
void up(struct semaphore *sem);
sema_init:用于初始化信号量,val是赋给count成员的初值,这样就可以有val个进程同时获得信号量。
down:获取信号量(信号量的值减1),当信号量的值不为0时,可以立即获取信号量,否则进程休眠。
down_interruptible:同down,但是能够被信号唤醒。
down_trylock:只是尝试获取信号量,如果不能获取立即返回,返回0表示成功获取,返回1表示获取失败。
down_timeout:同down,但是在jiffies个时钟周期后如果还没有获取信号量,则超时返回,返回0表示成功获取信号量,返回负值表示超时。
up:释放信号量(信号量的值加1),如果有进程等待信号量,则唤醒这些进程。
(1)信号量可以被多个进程同时持有,当给信号量赋初值1时,信号量成为二值信号量,也称为互斥信号量,可以用来互斥。
(2)如果不能获得信号量,则进程休眠,调度其他的进程执行,不会进行忙等待。
(3)因为获取信号量可能会引起进程切换,所以不能用在中断上下文中,如果必须要用,只能使用down_trylock。不过在中断上下文中可以使用up释放信号量,从而唤醒其他进程。
(4)持有信号量期间可以调用调度器,但需要特别注意是否会产生死锁。
(5)信号量的开销比较大,在不违背自旋锁的使用规则的情况下,应该优先使用自旋锁。
信号量除了不能用于中断上下文,还有一个缺点就是不是很智能。在获取信号量的代码中,只要信号量的值为0,进程马上就休眠了。但是更一般的情况是,在不会等待太长的时间后,信号量就可以马上获得,那么信号量的操作就要经历使进程先休眠再被唤醒的一个漫长过程。可以在信号量不能获取的时候,稍微耐心地等待一小段时间,如果在这段时间能够获取信号量,那么获取信号量的操作就可以立即返回,否则再将进程休眠也不迟。为了实现这种比较智能化的信号量,内核提供了另外一种专门用于互斥的高效率信号量,也就是互斥量,也叫互斥体,类型为struct mutex。
相关的API如下:
mutex_init(mutex)
void mutex_lock(struct mutex *lock);
int mutex_lock_interruptible(struct mutex *lock);
int mutex_trylock(struct mutex *lock);
void mutex_unlock(struct mutex *lock);
有了前面互斥操作的基础,使用互斥量来做互斥也就很容易实现了,示例代码如下。
int i = 5;
/* 定义互斥量 */
struct mutex lock;
/* 使用之前初始化互斥量 */
mutex_init(&lock);
/* 访问共享资源之前获得互斥量 */
mutex_lock(&lock);
i++;
/* 访问完共享资源后释放互斥量 */
mutex_unlock(&lock);
(1)要在同一上下文对互斥量上锁和解锁,比如不能在读进程中上锁,也不能在写进程中解锁。
(2)和自旋锁一样,互斥量的上锁是不能递归的。
(3)当持有互斥量时,不能退出进程。
(4)不能用于中断上下文,即使mutex_trylock也不行。
(5)持有互斥量期间,可以调用可能会引起进程切换的函数。
(6)在不违背自旋锁的使用规则时,应该优先使用自旋锁。
(7)在不能使用自旋锁但不违背互斥量的使用规则时,应该优先使用互斥量,而不是信号量。
RCU(Read-Copy Update)机制即读—复制—更新。RCU机制对共享资源的访问都是通过指针来进行的,读者(对共享资源发起读访问操作的代码)通过对该指针进行解引用,来获取想要的数据。写者在发起写访问操作的时候,并不是去写以前的共享资源内存,而是另起炉灶,重新分配一片内存空间,复制以前的数据到新开辟的内存空间(有时不用复制),然后修改新分配的内存空间里面的内容。当写结束后,等待所有的读者都完成了对原有内存空间的读取后,将读的指针更新,指向新的内存空间,之后的读操作将会得到更新后的数据。这非常适合于读访问多、写访问少的情况,它尽可能地减少了对锁的使用。
内核使用RCU机制实现了对数组、链表和NMI(不可屏蔽中断)操作的大量API,不过要能理解RCU,通过下面几个最简单的API即可。
void rcu_read_lock(void);
rcu_dereference(p)
void rcu_read_unlock(void);
rcu_assign_pointer(p,v)
void synchronize_rcu(void);
rcu_read_lock:读者进入临界区。
rcu_dereference:读者用于获取共享资源的内存区指针。
rcu_read_unlock:读者退出临界区。
rcu_assign_pointer:用新指针更新老指针。
synchronize_rcu:等待之前的读者完成读操作。
使用RCU机制最简单的示例代码如下:
struct foo
int a;
char b;
long c;
;
DEFINE_SPINLOCK(foo_mutex);/* 定义一个自选锁 */
struct foo *gbl_foo; /* 一个指向共享资源数据的全局指针 */
/* 写时调用*/
void foo_update_a(int new_a)
struct foo *new_fp;
struct foo *old_fp; /* 用于存储旧指针,待所有读者退出后才能用于释放 */
new_fp = kmalloc(sizeof(*new_fp),GFP_KERNEL);
spin_lock(&foo_mutex); /*写锁*/
old_fp = gbl_foo;
*new_fp = *old_fp;
new_fp->a = new_a; /* 假装修改一下data,代表要修改的部分数据,此处只修改了一个成员变量 foo.a*/
rcu_assign_pointer(gbl_foo,new_fp); /* 给gbl_foo赋予新指针地址*/
spin_unlock(&foo_mutex); /*解写锁*/
synchronize_rcu(); /*等待其他用户对老数据的访问全部结束*/
kfree(old_fp); /* 释放旧指针指向的内存区域 */
/*读时调用*/
int foo_get_a(void)
int retval;
rcu_read_lock();
retval = rcu_dereference(gbl_foo)->a;
rcu_read_unlock();
return retval;
讨论完内核中的互斥后,接下来我们来看看内核中的同步。同步是指内核中的执行路径需要按照一定的顺序来进行,例如执行路径A要继续往下执行则必须要保证执行路径B执行到某一个点才行。以一个ADC设备来说,假设一个驱动中的一个执行路径是将ADC 采集到的数据做某些转换操作(比如将若干次采样结果做平均),而另一个执行路径专门负责ADC采样,那么做转换操作的执行路径要等待做采样的执行路径。
同步可以用信号量来实现,就以上面的ADC驱动来说,可以先初始化一个值为0的信号量,做转换操作的执行路径先用down来获取这个信号量,如果在这之前没有采集到数据,那么做转换操作的路径将会休眠等待。当做采样的路径完成采样后,调用up释放信号量,那么做转换操作的执行路径将会被唤醒。这就保证了采样发生在转换之前,也就完成了采样和转换之间的同步。
不过内核专门提供了一个完成量来实现该操作,完成量的结构类型定义如下。
struct completion
unsigned int done;
wait_queue_head_t wait;
;
done是是否完成的状态,是一个计数值,为0表示未完成。wait是一个等待队列头,回想前面阻塞操作的知识,不难想到完成量的工作原理。当done为0时进程阻塞,当内核的其他执行路径使done的值大于0时,负责唤醒被阻塞在这个完成量上的进程。
完成量的主要API如下。
void init_completion(struct completion *x);
wait_for_completion(struct completion *);
wait_for_completion_interruptible(struct completion *x);
unsigned long wait_for_completion_timeout(struct completion *x,unsigned long timeout);
long wait_for_completion_interruptible_timeout(struct completion *x,unsigned long timeout);
bool try_wait_for_completion(struct completion *x);
void complete(struct completion *);
void complete_all(struct completion *);
完成量的使用例子如下。
/* 定义完成量 */
struct completion comp;
/* 使用之前初始化完成量 */
init_completion(&comp